package com.atuguigu.flink.Day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;

public class Example4 {
    private static OutputTag<String> timeout=new OutputTag<String>("out"){};
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env
                .fromElements(
                        Tuple3.of("order1", "create", 1000L),
                        Tuple3.of("order2", "create", 2000L),
                        Tuple3.of("order1", "pay", 3000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                                        return element.f2;
                                    }
                                })
                );
        //定义检测模板
        Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
                .<Tuple3<String, String, Long>>begin("create")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return value.f1.equals("create");
                    }
                })
                .next("pay")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return value.f1.equals("pay");
                    }
                }).within(Time.seconds(5));

        //匹配输入出来的数据流
        PatternStream<Tuple3<String, String, Long>> patternStream = CEP.pattern(orderStream, pattern);

        SingleOutputStreamOperator<String> result = patternStream
                .flatSelect(
                        timeout,
                        new PatternFlatTimeoutFunction<Tuple3<String, String, Long>, String>() {
                            @Override
                            public void timeout(Map<String, List<Tuple3<String, String, Long>>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
                                out.collect("订单：" + pattern.get("create").get(0).f0 + "订单超时");
                            }
                        },
                        new PatternFlatSelectFunction<Tuple3<String, String, Long>, String>() {
                            @Override
                            public void flatSelect(Map<String, List<Tuple3<String, String, Long>>> pattern, Collector<String> out) throws Exception {
                                out.collect("订单:" + pattern.get("pay").get(0).f0 + "支付成功");
                            }
                        }

                );

        result.print();
        result.getSideOutput(timeout).print();


        env.execute();
    }
}
